-
Notifications
You must be signed in to change notification settings - Fork 14.4k
KAFKA-19144 Move DelayedProduce to server module #19793
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
KAFKA-19144 Move DelayedProduce to server module #19793
Conversation
03e76dc
to
9f2951e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patch. Leave some minor comments.
val produceResponseStatus = initialProduceStatus.map { case (k, status) => k -> status.responseStatus } | ||
responseCallback(produceResponseStatus) | ||
responseCallback(produceResponseStatus.asJava) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use initialProduceStatus
to build a Scala map and transfer to Java. Probably, we can build a Java map directly, so we can avoid asJava
here.
val produceResponseStatus = new util.HashMap[TopicIdPartition, PartitionResponse]()
initialProduceStatus.foreach { case (k, status) => produceResponseStatus.put(k, status.responseStatus()) }
responseCallback(produceResponseStatus)
@@ -1006,7 +1026,7 @@ class ReplicaManager(val config: KafkaConfig, | |||
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.logStartOffset | |||
) | |||
} | |||
responseCallback(responseStatus) | |||
responseCallback(responseStatus.asJava) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
val responseStatus = new util.HashMap[TopicIdPartition, PartitionResponse]()
entries.foreach { case (topicIdPartition, _) =>
responseStatus.put(topicIdPartition, new PartitionResponse(
Errors.INVALID_REQUIRED_ACKS,
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.firstOffset,
RecordBatch.NO_TIMESTAMP,
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.logStartOffset
))
}
responseCallback(responseStatus)
I will review this PR after #19798 gets merged. |
@johnny94 please fix the conflicts |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@johnny94 Thanks for the patch!
I have a few comments. PTAL
def delegate(tp: TopicPartition, status: ProducePartitionStatus) : Unit = { | ||
val (hasEnough, error) = getPartitionOrError(tp) match { | ||
case Left(err) => | ||
// Case A |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this comment mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see this is relevant to the comment of DelayedProduce,
but it's confusing that these comments are standalone here.
Could you write the whole meaning of these cases, or link these comments to tryComplete
?
partition.checkEnoughReplicasReachOffset(status.requiredOffset) | ||
} | ||
|
||
// Case B || C.1 || C.2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
def appendCallback(responses: scala.collection.Map[TopicIdPartition, PartitionResponse]): Unit = { | ||
val response = responses.get(partition) | ||
assertTrue(response.isDefined) | ||
def appendCallback(responses: java.util.Map[TopicIdPartition, PartitionResponse]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that we already have an import alias for java.util.Map
, we could reuse it.
def appendCallback(responses: java.util.Map[TopicIdPartition, PartitionResponse]): Unit = { | |
def appendCallback(responses: JMap[TopicIdPartition, PartitionResponse]): Unit = { |
@@ -1104,7 +1104,7 @@ class TransactionStateManagerTest { | |||
capturedAppends: mutable.Map[TopicIdPartition, mutable.Buffer[MemoryRecords]] | |||
): Unit = { | |||
val recordsCapture: ArgumentCaptor[Map[TopicIdPartition, MemoryRecords]] = ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, MemoryRecords]]) | |||
val callbackCapture: ArgumentCaptor[Map[TopicIdPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, PartitionResponse] => Unit]) | |||
val callbackCapture: ArgumentCaptor[java.util.Map[TopicIdPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[java.util.Map[TopicIdPartition, PartitionResponse] => Unit]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please import java.util
@@ -2832,7 +2832,7 @@ class KafkaApisTest extends Logging { | |||
any(), | |||
ArgumentMatchers.eq(requestLocal), | |||
any() | |||
)).thenAnswer(_ => responseCallback.getValue.apply(Map(new TopicIdPartition(topicId,tp2) -> new PartitionResponse(Errors.NONE)))) | |||
)).thenAnswer(_ => responseCallback.getValue.apply(Map(new TopicIdPartition(topicId,tp2) -> new PartitionResponse(Errors.NONE)).asJava)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
)).thenAnswer(_ => responseCallback.getValue.apply(Map(new TopicIdPartition(topicId,tp2) -> new PartitionResponse(Errors.NONE)).asJava)) | |
)).thenAnswer(_ => responseCallback.getValue.apply(util.Map.of(new TopicIdPartition(topicId,tp2), new PartitionResponse(Errors.NONE)))) |
val response = responses.get(topicIdPartition) | ||
assertTrue(response.isDefined) | ||
def appendCallback(responses: util.Map[TopicIdPartition, PartitionResponse]): Unit = { | ||
val response = java.util.Optional.ofNullable(responses.get(topicIdPartition)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already imported java.util.Optional
, so we don't need a full-qualified name.
PARTITION_EXPIRATION_METERS.computeIfAbsent(partition, | ||
key -> METRICS_GROUP.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, | ||
Map.of("topic", key.topic(), "partition", String.valueOf(key.partition())))) | ||
.mark(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style nit:
PARTITION_EXPIRATION_METERS.computeIfAbsent(partition, | |
key -> METRICS_GROUP.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, | |
Map.of("topic", key.topic(), "partition", String.valueOf(key.partition())))) | |
.mark(); | |
PARTITION_EXPIRATION_METERS.computeIfAbsent(partition, | |
key -> METRICS_GROUP.newMeter("ExpiresPerSec", | |
"requests", | |
TimeUnit.SECONDS, | |
Map.of("topic", key.topic(), "partition", String.valueOf(key.partition()))) | |
).mark(); |
boolean anyPending = produceMetadata.produceStatus | ||
.values() | ||
.stream() | ||
.anyMatch(ProducePartitionStatus::acksPending); | ||
if (!anyPending) { | ||
return forceComplete(); | ||
} | ||
|
||
return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
boolean anyPending = produceMetadata.produceStatus | |
.values() | |
.stream() | |
.anyMatch(ProducePartitionStatus::acksPending); | |
if (!anyPending) { | |
return forceComplete(); | |
} | |
return false; | |
return produceMetadata.produceStatus.values() | |
.stream() | |
.findAny() | |
.map(__ -> false) | |
.orElseGet(this::forceComplete); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, I think it keeps the idea of original implementation and easier to read. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm okay with it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @johnny94 for this patch, left some comments
val produceResponseStatus = new util.HashMap[TopicIdPartition, PartitionResponse] | ||
initialProduceStatus.foreach { case (k, status) => k -> produceResponseStatus.put(k, status.responseStatus) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val produceResponseStatus = new util.HashMap[TopicIdPartition, PartitionResponse] | |
initialProduceStatus.foreach { case (k, status) => k -> produceResponseStatus.put(k, status.responseStatus) } | |
val produceResponseStatus = initialProduceStatus.map { case (k, status) => | |
k -> status.responseStatus | |
}.asJava |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's ok to use HashMap
instead of asJava
conversion.
val response = Optional.ofNullable(responses.get(partition)) | ||
|
||
assertTrue(response.isPresent) | ||
result.fire(response.get) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val response = Optional.ofNullable(responses.get(partition)) | |
assertTrue(response.isPresent) | |
result.fire(response.get) | |
val response = responses.get(partition) | |
assertNotNull(response) | |
result.fire(response) |
val response = java.util.Optional.ofNullable(responses.get(topicIdPartition)) | ||
assertTrue(response.isPresent) | ||
result.fire(response.get) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val response = java.util.Optional.ofNullable(responses.get(topicIdPartition)) | |
assertTrue(response.isPresent) | |
result.fire(response.get) | |
val response = responses.get(topicIdPartition) | |
assertNotNull(response) | |
result.fire(response) |
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.kafka.server; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Last time, we moved DelayedDeleteRecords
to org.apache.kafka.server.purgatory
(see 2994e5e). I'm curious, why we chose a different location for DelayedProduce?
LOGGER.trace("Checking produce satisfaction for {}, current status {}", topicIdPartition, status); | ||
// skip those partitions that have already been satisfied | ||
if (status.acksPending) { | ||
// Delegate to `ReplicaManager#maybeAddDelayedProduc` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybeAddDelayedProduc -> maybeAddDelayedProduce
This PR moves
DelayedProduce
to the server module. One notable change is that the type of theresponseCallback
parameter inReplicaManager#appendRecords()
has been changed to a JavaMap
. Other related type changes have been made accordingly.